package com.jokerku.mini.schedule.task;

import com.alibaba.fastjson.JSON;
import com.jokerku.mini.schedule.common.Constants;
import com.jokerku.mini.schedule.domain.ExecOrder;
import com.jokerku.mini.schedule.domain.ZkPath;
import com.jokerku.mini.schedule.sharding.ShardingContext;
import com.jokerku.mini.schedule.sharding.ShardingService;
import com.jokerku.mini.schedule.threadpool.JObExecutorServiceHandlerFactory;
import com.jokerku.mini.schedule.threadpool.JobExecutorServiceHandler;
import com.jokerku.mini.schedule.util.SleepUtil;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ReflectionUtils;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;
import java.util.concurrent.ExecutorService;

/**
 * @Author: guzq
 * @CreateTime: 2023/05/25 11:30
 * @Description: 定时任务方法包装类
 * @Version: 1.0
 */
public class ScheduleRunnable implements Runnable {

    private static final Logger logger = LoggerFactory.getLogger(ScheduleRunnable.class);

    /**
     * 类对象
     */
    private Object bean;
    /**
     * 类名
     */
    private String beanName;
    /**
     * 方法名
     */
    private String methodName;
    /**
     * 任务包装类
     */
    private ExecOrder execOrder;

    public ScheduleRunnable(Object bean, ExecOrder execOrder) {
        this.bean = bean;
        this.execOrder = execOrder;
        this.beanName = execOrder.getBeanName();
        this.methodName = execOrder.getMethodName();
    }

    public String getTaskId() {
        return beanName + Constants.Global.HORIZONTAL_LINE + methodName;
    }

    @Override
    public void run() {
        ShardingService.getInstance().shardingIfNecessary();
        try {

            CuratorFramework client = Constants.Global.client;
            String taskPath = ZkPath.getRunningTaskPath(getTaskId());

            if (checkTaskPathAndRetry(client, taskPath)) return;

            if (null == client.checkExists().forPath(ZkPath.getRunningTaskIpPath(getTaskId(), Constants.Global.ip))) {
                return;
            }

            byte[] data = client.getData().forPath(ZkPath.getRunningTaskIpPath(getTaskId(), Constants.Global.ip));
            List<Integer> shardItemLists = JSON.parseArray(new String(data, Constants.Global.CHARSET), Integer.class);
            ExecutorService executorService;

            if (shardItemLists.size() == 1) {
                executorService = JObExecutorServiceHandlerFactory.getHandler("SIGNAL").createExecutorService(getTaskId());

                ShardingContext shardingContext = new ShardingContext(getTaskId(), getTaskId(),
                        execOrder.getShardingTotalCount(), shardItemLists.get(0), execOrder.getJobParameter());

                executorService.submit(() -> {
                    ShardingContext.setShardingContext(shardingContext);
                    try {
                        invoke();
                    } catch (Exception e) {
                        logger.error("mini schedule run error", e);
                    }
                });
            } else {
                executorService = JObExecutorServiceHandlerFactory.getHandler("CPU").createExecutorService(getTaskId());
                for (Integer shardItem : shardItemLists) {
                    ShardingContext shardingContext = new ShardingContext(getTaskId(), getTaskId(),
                            execOrder.getShardingTotalCount(), shardItem, execOrder.getJobParameter());

                    executorService.submit(() -> {
                        ShardingContext.setShardingContext(shardingContext);
                        try {
                            invoke();
                        } catch (Exception e) {
                            logger.error("mini schedule run error", e);
                        }
                    });
                }
            }


        } catch (Exception e) {
            logger.error("mini schedule error", e);
        }

    }

    private void invoke() throws Exception {
        // 反射调用方法
        Method method = bean.getClass().getDeclaredMethod(methodName);
        ReflectionUtils.makeAccessible(method);
        method.invoke(bean);
    }

    private boolean checkTaskPathAndRetry(CuratorFramework client, String taskPath) throws Exception {
        int retryTime = 0;
        while (true) {
            Stat stat = client.checkExists().forPath(taskPath);
            if (stat == null) {
                if (++retryTime >= 5) {
                    return true;
                } else {
                    SleepUtil.sleep();
                }
            } else {
                break;
            }
        }
        return false;
    }
}
